# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta, abstractmethod
import hashlib
import numpy as np
from hysop.tools.htypes import check_instance, to_tuple, first_not_None
from hysop.tools.numerics import default_invalid_value
from hysop.tools.mpi_utils import iter_mpi_requests, dtype_to_mpi_type
from hysop.constants import (
GhostOperation,
GhostMask,
Backend,
DirectionLabels,
ExchangeMethod,
)
from hysop.topology.topology import Topology, TopologyView
from hysop.topology.cartesian_topology import CartesianTopologyView
from hysop.core.arrays.all import HostArray
from hysop.core.mpi import MPI
from hysop.core.mpi.topo_tools import TopoTools
from hysop.testsenv import __HAS_OPENCL_BACKEND__
if __HAS_OPENCL_BACKEND__:
from hysop.core.arrays.all import OpenClArray
from hysop.backend.device.opencl import cl, clArray
from hysop.backend.device.opencl.opencl_kernel_launcher import HostLauncherI
from hysop.constants import BoundaryCondition, BoundaryConditionConfig
GHOST_EXCHANGE_DEBUG_LEVEL = 0
[docs]
def gprint(*args, **kwds):
level = kwds.pop("level", 2)
if GHOST_EXCHANGE_DEBUG_LEVEL >= level:
print(*args, **kwds)
[docs]
def gprint_buffer(msg, buf, *args, **kwds):
no_data = kwds.pop("no_data", False)
if isinstance(buf, list):
mpi_type = buf[1]
buf = buf[0]
else:
mpi_type = None
gprint(
"{}: mpi_type={}, shape={}, dtype={}, c_contiguous={}, f_contiguous={}".format(
msg,
mpi_type,
buf.shape,
buf.dtype,
buf.flags.c_contiguous,
buf.flags.f_contiguous,
)
)
if no_data:
return
gprint(" " + "\n ".join(str(buf).split("\n")))
[docs]
class LocalBoundaryExchanger:
"""
Helper class to generate symmetric and antisymmetric local ghost exchangers.
This is used for non-periodic boundary conditions:
HOMOGENEOUS_DIRICHLET: antisymmetric ghost exchange
HOMOGENEOUS_NEUMANN: symmetric ghost exchange
"""
[docs]
@classmethod
def build_exchanger(cls, shape, direction, H, to_left):
shape = to_tuple(shape)
assert isinstance(direction, int)
assert isinstance(H, tuple)
ndim = len(shape)
S = shape[direction]
G = (S - 1) // 2
H = np.asarray(H).copy()
assert direction < ndim
assert S % 2 == 1
assert G > 0
assert H.size == G + 1
def mk_slc(*args, **kwds):
if "extend" in kwds:
slices = [np.newaxis] * ndim
else:
slices = [slice(None, None)] * ndim
slices[direction] = slice(*args)
return tuple(slices)
if to_left:
src_slc = mk_slc(G, S, +1)
dst_slc = mk_slc(0, G + 1, +1)
islc = mk_slc(None, None, None)
oslc = mk_slc(None, None, -1)
else:
src_slc = mk_slc(0, G + 1, +1)
dst_slc = mk_slc(G, S, +1)
islc = mk_slc(None, None, -1)
oslc = mk_slc(None, None, None)
hslc = mk_slc(0, H.size, 1, extend=True)
H = H[hslc]
def exchange_ghosts(X):
assert X is not None
assert X.shape == shape
X[dst_slc][oslc] = H * X[src_slc][islc]
return exchange_ghosts
[docs]
@classmethod
def build_symmetric_exchanger(cls, shape, direction, to_left):
S = (shape[direction] + 1) // 2
H = (1,) * S
return cls.build_exchanger(
shape=shape, direction=direction, H=H, to_left=to_left
)
[docs]
@classmethod
def build_antisymmetric_exchanger(cls, shape, direction, to_left):
S = (shape[direction] + 1) // 2
H = (0,) + (-1,) * (S - 1)
return cls.build_exchanger(
shape=shape, direction=direction, H=H, to_left=to_left
)
[docs]
@classmethod
def build_scalar_exchanger(cls, value, shape, direction, to_left=None):
if to_left is None:
def exchange_ghosts(X, value=value):
assert X is not None
X[...] = value
else:
ndim = len(shape)
def mk_slc(*args, **kwds):
if "extend" in kwds:
slices = [np.newaxis] * ndim
else:
slices = [slice(None, None)] * ndim
slices[direction] = slice(*args)
return tuple(slices)
S = shape[direction]
G = (S - 1) // 2
if to_left:
dst_slc = mk_slc(0, G + 1, +1)
else:
dst_slc = mk_slc(G, S, +1)
def exchange_ghosts(X, value=value):
assert X is not None
X[dst_slc] = value
return exchange_ghosts
[docs]
class GhostExchangerI(metaclass=ABCMeta):
"""Abstract interface for a ghost exchanger."""
[docs]
@abstractmethod
def exchange_ghosts(self, **kwds):
pass
def __call__(self, **kwds):
return self.exchange_ghosts(**kwds)
[docs]
class MultiGhostExchanger(GhostExchangerI):
"""Handle multiple ghost exchangers."""
def __init__(self, name):
self.name = name
self.kind = None
self._exchangers = ()
self._launcher = None
def __iadd__(self, other):
assert self._launcher is None, "launcher already built."
if other is None:
return self
if isinstance(other, GhostExchanger):
if other.kind is None:
return
if self.kind is None:
self.kind = other.kind
assert self.kind == other.kind
self._exchangers += (other,)
elif isinstance(other, MultiGhostExchanger):
if other.kind is None:
return
assert self.kind == other.kind
self._exchangers += other._exchangers
else:
msg = f"Uknown type {type(other)}."
raise TypeError(msg)
return self
[docs]
def exchange_ghosts(self, **kwds):
"""Exchange ghosts on specified data."""
assert self._exchangers
if self._launcher is None:
self._build_launcher()
assert self._launcher is not None
if self.kind is Backend.OPENCL:
kwds.setdefault("queue", self._exchangers[0].cl_env.default_queue)
return self._launcher(**kwds)
def _build_launcher(self):
if self._launcher is not None:
return self._launcher
elif not self._exchangers:
return None
for g in self._exchangers:
g._build_launcher()
if self.kind is Backend.HOST:
def _launcher(**kwds):
for g in self._exchangers:
g._launcher(**kwds)
self._launcher = _launcher
elif self.kind is Backend.OPENCL:
from hysop.backend.device.opencl.opencl_kernel_launcher import (
OpenClKernelListLauncher,
)
name = f"multighost_exchanger_{self.name}"
kl = OpenClKernelListLauncher(name=name)
for g in self._exchangers:
kl += g._launcher
self._launcher = kl
else:
msg = f"Unknown array backend kind {self.kind}."
raise NotImplementedError(msg)
assert self._launcher is not None
return self._launcher
[docs]
class GhostExchanger(GhostExchangerI):
"""Prepare a backend specific ghost exchange, possibly on multiple data."""
def __init__(self, name, topology, data, exchange_method, ghost_op, ghost_mask):
check_instance(name, str)
check_instance(topology, TopologyView)
check_instance(data, tuple, minsize=1)
check_instance(exchange_method, ExchangeMethod)
check_instance(ghost_op, GhostOperation)
check_instance(ghost_mask, GhostMask)
dtype = data[0].dtype
for d in data[1:]:
if d.dtype != dtype:
msg = "All data should have the same dtype, got {} and {}."
msg = msg.format(dtype, d.dtype)
raise RuntimeError(msg)
base_dtype = dtype
base_mpi_type = dtype_to_mpi_type(dtype)
self.topology = topology
self.data = data
self.mesh = topology.mesh
self.backend = (
data[0].backend if hasattr(data[0], "backend") else topology.backend
)
self.host_backend = self.backend.host_array_backend
self.base_tag = int(hashlib.sha1(name.encode("utf-8")).hexdigest(), 16) % (
104729
)
self.exchange_method = exchange_method
self.ghost_op = ghost_op
self.ghost_mask = ghost_mask
self.base_dtype = base_dtype
self.base_mpi_type = base_mpi_type
self.name = name
[docs]
class CartesianDiscreteFieldGhostExchanger(GhostExchanger):
def __init__(
self,
name,
topology,
data,
kind=None,
directions=None,
ghosts=None,
ghost_op=None,
ghost_mask=None,
exchange_method=None,
global_lboundaries_config=None,
global_rboundaries_config=None,
):
"""
By default, we exchange all ghosts, including diagonals.
This is done by setting ghost_mask to GhostMask.FULL.
Just Cartesian Communicator neighbours are
considered here (there is no direct communication
between diagonal processes).
X P X
^
P < P > P
v
X P X
Diagonal ghosts are exchanged by chaining exchanges on two
or more axes.
If ghost_mask is set to GhostMask.CROSS, diagonal ghosts
are set to NAN to ensure they are not used.
Boundary conditions are hidden in the topology parameter:
(PERIODIC/PERIODIC) => standard periodic ghost exchange on the domain boundary
standard periodic ghost accumulation
(XXX/YYY) => symmetric or antisymmetric ghost exchange
ghost accumulation is a noop
Here XXX and YYY are either HOMOGENEOUS_DIRICHLET or HOMOGENEOUS_NEUMANN.
"""
gprint(
"CartesianGhostExchanger.__init__(name={}, \n topology={}, kind={}, directions={}, ghosts={}, exchange_method={}, ghost_op={}, ghost_mask={}]".format(
name,
topology.full_tag,
kind,
directions,
ghosts,
exchange_method,
ghost_op,
ghost_mask,
)
)
ghost_op = first_not_None(ghost_op, GhostOperation.EXCHANGE)
check_instance(ghost_op, GhostOperation)
ghost_mask = first_not_None(ghost_mask, GhostMask.FULL)
check_instance(ghost_mask, GhostMask)
exchange_method = first_not_None(
exchange_method, ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V
)
check_instance(exchange_method, ExchangeMethod)
super().__init__(
topology=topology,
data=data,
name=name,
exchange_method=exchange_method,
ghost_op=ghost_op,
ghost_mask=ghost_mask,
)
mesh = self.mesh
dim = mesh.dim
directions = to_tuple(first_not_None(directions, range(dim)))
ghosts = to_tuple(first_not_None(ghosts, mesh.ghosts))
check_instance(topology, CartesianTopologyView)
check_instance(directions, tuple, values=int)
check_instance(ghosts, tuple, values=int, minval=0)
assert len(directions) > 0
assert len(set(directions)) == len(directions)
assert len({id(d) for d in data}) == len(data)
assert all(0 <= d < dim for d in directions)
assert len(ghosts) == dim or len(ghosts) == 1
if len(ghosts) == 1:
ghosts = tuple(ghosts[0] if (i in directions) else 0 for i in range(dim))
self.directions = directions
self.outer_ghosts = mesh.get_local_outer_ghost_slices(
ghosts=ghosts, ghost_mask=ghost_mask
)
self.inner_ghosts = mesh.get_local_inner_ghost_slices(
ghosts=ghosts, ghost_mask=ghost_mask
)
self.boundary_layers = mesh.get_boundary_layer_slices(
ghosts=ghosts, ghost_mask=ghost_mask
)
self.all_inner_ghost_slices = mesh.get_all_local_inner_ghost_slices(
ghosts=ghosts
)
self.all_outer_ghost_slices = mesh.get_all_local_outer_ghost_slices(
ghosts=ghosts
)
self.dim = dim
self.ghosts = ghosts
# check that enforced boundaries kind matches topology boundaries
if global_lboundaries_config is not None:
global_lboundaries = np.asarray(
tuple(map(lambda x: x.bc, global_lboundaries_config))
)
assert (global_lboundaries == mesh.global_lboundaries).all(), (
global_lboundaries,
mesh.global_lboundaries,
)
if global_rboundaries_config is not None:
global_rboundaries = np.asarray(
tuple(map(lambda x: x.bc, global_rboundaries_config))
)
assert (global_rboundaries == mesh.global_rboundaries).all(), (
global_rboundaries,
mesh.global_rboundaries,
)
self.global_lboundaries_config = global_lboundaries_config
self.global_rboundaries_config = global_rboundaries_config
kind = first_not_None(kind, topology.backend.kind)
if kind == Backend.HOST:
for d in data:
assert isinstance(d, (np.ndarray, HostArray)), type(d)
elif __HAS_OPENCL_BACKEND__ and (kind == Backend.OPENCL):
for d in data:
assert isinstance(d, (clArray.Array, OpenClArray)), type(d)
else:
msg = f"Unknown topology array backend kind {kind}."
raise ValueError(msg)
self.kind = kind
self._launcher = None
def fmt_slices(slices, prefix="\n "):
s = ""
for i, (lslcs, rslcs, shape) in enumerate(slices):
s += f"{prefix}direction {i} || LEFT: {lslcs} || RIGHT: {rslcs} || SHAPE: {shape}"
return s
msg = """
TOPOLOGY INFO:
dim: {}
topology.shape: {}
topology.coords: {}
topology.lboundary: {}
topology.rboundary: {}
EXCHANGE INFO
kind: {}
directions: {}
ghosts: {}
outer_ghosts: {}
inner_ghosts: {}
boundary_layers: {}
global lboundaries: {}
global rboundaries: {}
""".format(
self.dim,
topology.proc_shape,
topology.proc_coords,
mesh.is_at_left_boundary,
mesh.is_at_right_boundary,
self.kind,
self.directions,
self.ghosts,
fmt_slices(self.outer_ghosts),
fmt_slices(self.inner_ghosts),
self.boundary_layers,
global_lboundaries_config,
global_rboundaries_config,
)
gprint(msg)
[docs]
def exchange_ghosts(self, **kwds):
"""Exchange ghosts on specified data."""
if self._launcher is None:
self._build_launcher()
assert self._launcher is not None
if self.kind is Backend.OPENCL:
kwds.setdefault("queue", self.backend.cl_env.default_queue)
return self._launcher(**kwds)
def _build_launcher(self):
if self._launcher is not None:
return self._launcher
if self.kind is Backend.HOST:
self._build_python_launcher()
elif self.kind is Backend.OPENCL:
self._build_opencl_launcher()
else:
msg = f"Unknown array backend kind {self.kind}."
raise NotImplementedError(msg)
assert self._launcher is not None
return self._launcher
def _prepare_launcher(self):
class _LauncherParameters:
def __init__(self):
# Call kwds depends on exchange method used (Send-Recv, Neighbor_AlltoAll, ...)
# SEND-RECV
# > one call per direction, per data buffer + (left + right)
self.isend_kwds = []
self.irecv_kwds = []
self.i_dst_buffers = []
self.i_src_buffers = []
# NEIGHBOR_ALL_TO_ALL_V
# > only one call for all buffers
self.v_kwds = None
self.v_send_requests = {}
self.v_recv_requests = {}
self.v_src_buffers = ()
self.v_dst_buffers = ()
self.v_send_buffer = None
self.v_recv_buffer = None
# NEIGHBOR_ALL_TO_ALL_W
# > one call per data buffer
self.w_kwds = []
self.w_src_buffers = []
self.w_dst_buffers = []
self.w_send_buffer = []
self.w_recv_buffer = []
# COMMON PARAMETERS
self.local_exchanges = []
self.local_symmetries = []
self.diagonal_ghosts = []
self.has_mpi_exchanges = False
self.from_buffer = None # should source data be bufferized ?
self.to_buffer = None # should target data be bufferized ?
def setup(self):
local_symmetries = []
for ls in self.local_symmetries:
(buf, slices, shape, d, to_left, boundary) = ls
bc = boundary.bc
if bc is BoundaryCondition.HOMOGENEOUS_DIRICHLET:
if isinstance(boundary, BoundaryConditionConfig) and (
boundary.data is not None
):
msg = "Boundary of type HOMOGENEOUS_DIRICHLET does not handle custom boundary data, got {}."
raise RuntimeError(msg, type(boundary.data).__name__)
else:
fn = LocalBoundaryExchanger.build_antisymmetric_exchanger(
shape=shape, direction=d, to_left=to_left
)
elif bc is BoundaryCondition.HOMOGENEOUS_NEUMANN:
if isinstance(boundary, BoundaryConditionConfig) and (
boundary.data is not None
):
# allow to force boundary ghosts to a specific scalar value
if isinstance(boundary.data, (int, float, np.number)):
fn = LocalBoundaryExchanger.build_scalar_exchanger(
value=boundary.data, shape=shape, direction=d
)
else:
msg = "Boundary of type HOMOGENEOUS_NEUMANN only handle custom scalar boundary data, got {}."
raise RuntimeError(msg, type(boundary.data).__name__)
else:
fn = LocalBoundaryExchanger.build_symmetric_exchanger(
shape=shape, direction=d, to_left=to_left
)
elif bc is BoundaryCondition.DIRICHLET:
if isinstance(boundary.data, (int, float, np.number)):
fn = LocalBoundaryExchanger.build_scalar_exchanger(
value=boundary.data,
shape=shape,
direction=d,
to_left=to_left,
)
else:
msg = "Boundary of type DIRICHLET only handle custom scalar boundary data, got {}."
raise RuntimeError(msg, type(boundary.data).__name__)
else:
msg = f"Unknown boundary condition {bc}."
raise RuntimeError(msg, type(boundary).__name__)
local_symmetries.append((fn, buf, slices, shape, d, to_left, bc))
self.local_symmetries = local_symmetries
return self
def msg_tag(i, local_rank, target_rank, d, direction):
tag = self.base_tag
tag += (i + 1) * 7919
tag += (local_rank + 1) * 967
tag += (target_rank + 1) * 103
tag += (d + 1) * 97
tag += (direction + 1) * 17
return tag
dim = self.dim
ghosts = self.ghosts
ghost_op = self.ghost_op
comm = self.topology.comm
local_rank = self.topology.cart_rank
neighbour_ranks = self.topology.proc_neighbour_ranks
proc_shape = self.topology.proc_shape
base_dtype = self.base_dtype
base_mpi_type = self.base_mpi_type
exchange_method = self.exchange_method
global_lboundaries_config = self.global_lboundaries_config
global_rboundaries_config = self.global_rboundaries_config
mesh = self.topology.mesh
is_at_left_boundary = mesh.is_at_left_boundary
is_at_right_boundary = mesh.is_at_right_boundary
left_boundaries_kind = mesh.local_lboundaries
right_boundaries_kind = mesh.local_rboundaries
src_data_on_device = self.kind is not Backend.HOST
dst_data_on_device = self.kind is not Backend.HOST
all_outer_ghost_slices = self.all_outer_ghost_slices
lp = _LauncherParameters()
assert self.data
for i, buf in enumerate(self.data):
lp.w_send_requests = {}
lp.w_recv_requests = {}
if (
(ghost_op is GhostOperation.EXCHANGE)
and (dim > 1)
and (self.ghost_mask is GhostMask.CROSS)
):
# set all outer diagonal ghosts to NAN
for directions in all_outer_ghost_slices[dim]:
if all(ghosts[d] > 0 for d in directions):
for displacements in all_outer_ghost_slices[dim][directions]:
ndisplacements = sum(d != 0 for d in displacements)
if ndisplacements < 2:
continue
slc, shape = all_outer_ghost_slices[dim][directions][
displacements
]
value = default_invalid_value(dtype=base_dtype)
lp.diagonal_ghosts.append((buf, slc, shape, value))
assert self.directions
assert any(ghosts[d] > 0 for d in self.directions)
for d in self.directions:
if ghosts[d] == 0:
continue
lboundary = left_boundaries_kind[d]
rboundary = right_boundaries_kind[d]
at_left = is_at_left_boundary[d]
at_right = is_at_right_boundary[d]
lnone = lboundary is BoundaryCondition.NONE
rnone = rboundary is BoundaryCondition.NONE
lperiodic = lboundary is BoundaryCondition.PERIODIC
rperiodic = rboundary is BoundaryCondition.PERIODIC
should_exchange_to_left = lnone or lperiodic
should_exchange_to_right = rnone or rperiodic
assert at_left ^ lnone
assert at_right ^ rnone
inner_left, inner_right, shape = self.inner_ghosts[d]
outer_left, outer_right, shape = self.outer_ghosts[d]
left_boundary_layer, right_boundary_layer, bl_shape = (
self.boundary_layers[d]
)
assert (left_boundary_layer is None) ^ (at_left and not lperiodic)
assert (right_boundary_layer is None) ^ (at_right and not rperiodic)
left_rank = neighbour_ranks[0, d]
right_rank = neighbour_ranks[1, d]
nprocs = proc_shape[d]
lp.has_mpi_exchanges |= nprocs > 1
assert (
(nprocs == 1) or should_exchange_to_left or should_exchange_to_right
)
msg = """ DATA {} EXCHANGES FOR DIRECTION {}:
nprocs (directional procs): {}
src_data_on_device: {}
dst_data_on_device: {}
local_rank: {}
left_rank / right_rank: {} / {}
at_left / at_right: {} / {}
lboundary / rboundary: {} / {}
xchg_to_left / xchg_to_right: {} / {}
inner_left / inner_right: {} / {}
outer_left / outer_right: {} / {}
associated_shape: {}
left_blayer / right_blayer: {} / {}
boundary_layer_shape: {}
Send/Receive:""".format(
i,
d,
nprocs,
src_data_on_device,
dst_data_on_device,
local_rank,
left_rank,
right_rank,
at_left,
at_right,
lboundary,
rboundary,
should_exchange_to_left,
should_exchange_to_right,
inner_left,
inner_right,
outer_left,
outer_right,
shape,
left_boundary_layer,
right_boundary_layer,
bl_shape,
)
gprint(msg)
if not should_exchange_to_left:
glboundary = (
global_lboundaries_config[d]
if (global_lboundaries_config is not None)
else lboundary
)
assert glboundary.bc is lboundary
lp.local_symmetries.append(
(buf, left_boundary_layer, bl_shape, d, 1, glboundary)
)
if not should_exchange_to_right:
grboundary = (
global_rboundaries_config[d]
if (global_rboundaries_config is not None)
else rboundary
)
assert grboundary.bc is rboundary
lp.local_symmetries.append(
(buf, right_boundary_layer, bl_shape, d, 0, grboundary)
)
if nprocs == 1:
# We need to exchange with ourselves (by periodicity)
assert at_left and at_right
assert should_exchange_to_left == should_exchange_to_right
should_exchange = should_exchange_to_left
if should_exchange:
lp.local_exchanges.append(
(buf, outer_right, inner_left, shape, d)
)
lp.local_exchanges.append(
(buf, outer_left, inner_right, shape, d)
)
lp.from_buffer = first_not_None(lp.from_buffer, False)
lp.to_buffer = first_not_None(lp.to_buffer, False)
elif ghost_op is GhostOperation.EXCHANGE:
# SEND DIRECTION IS
# INNER GHOSTS ---> OUTER GHOSTS
# OPERATION IS
# OUTER_GHOSTS[...] = INNER_GHOSTS
if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V:
# Send and receive every buffer at once to neighbours
# /!\ we send and receive all data components at once
if nprocs == 2:
# switch left and right in 2 proc periodic case
outer_right, outer_left = outer_left, outer_right
if should_exchange_to_left:
lp.v_send_requests.setdefault(
("left", left_rank), []
).append(buf[inner_left])
lp.v_recv_requests.setdefault(
("left", left_rank), []
).append(buf[outer_left])
if should_exchange_to_right:
lp.v_send_requests.setdefault(
("right", right_rank), []
).append(buf[inner_right])
lp.v_recv_requests.setdefault(
("right", right_rank), []
).append(buf[outer_right])
lp.from_buffer = True
lp.to_buffer = True
elif exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W:
# Send and receive to and from every neigbours
# /!\ only one buffer at a time
if nprocs == 2:
# switch left and right in 2 proc periodic case
outer_right, outer_left = outer_left, outer_right
if should_exchange_to_left:
lp.w_send_requests.setdefault(
("left", left_rank), []
).append((buf, inner_left))
lp.w_recv_requests.setdefault(
("left", left_rank), []
).append((buf, outer_left))
if should_exchange_to_right:
lp.w_send_requests.setdefault(
("right", right_rank), []
).append((buf, inner_right))
lp.w_recv_requests.setdefault(
("right", right_rank), []
).append((buf, outer_right))
lp.from_buffer = src_data_on_device
lp.to_buffer = dst_data_on_device
elif exchange_method is ExchangeMethod.ISEND_IRECV:
# Exchanges with left neighour
if should_exchange_to_left:
# send inner left to left rank
assert (left_rank != local_rank) and (
left_rank != -1
), left_rank
sendtag = msg_tag(i, local_rank, left_rank, d, 0)
if src_data_on_device:
tmp = self.host_backend.empty(
shape=shape, dtype=base_dtype
)
send_buf = tmp.handle
mpi_type = base_mpi_type.Create_contiguous(
send_buf.size
)
mpi_type.Commit()
lp.i_src_buffers += ((tmp, buf, inner_left),)
else:
send_buf = buf.handle
mpi_type = TopoTools.create_subarray_from_buffer(
send_buf, inner_left
)
send_kwds = {
"buf": [send_buf, mpi_type],
"dest": left_rank,
"tag": sendtag,
}
lp.isend_kwds.append(send_kwds)
msg = "\t\t>P{}.ISend(shape={}, dtype={}, tag={}) inner left data to left neighbour process P{}."
msg = msg.format(
local_rank, shape, buf.dtype, sendtag, left_rank
)
gprint(msg)
# receive outer right from left rank
recvtag = msg_tag(i, left_rank, local_rank, d, 1)
if dst_data_on_device:
tmp = self.host_backend.empty(
shape=shape, dtype=base_dtype
)
recv_buf = tmp.handle
mpi_type = base_mpi_type.Create_contiguous(
recv_buf.size
)
mpi_type.Commit()
lp.i_dst_buffers += ((tmp, buf, outer_left),)
else:
recv_buf = buf.handle
mpi_type = TopoTools.create_subarray_from_buffer(
recv_buf, outer_left
)
recv_kwds = {
"buf": [recv_buf, mpi_type],
"source": left_rank,
"tag": recvtag,
}
lp.irecv_kwds.append(recv_kwds)
msg = "\t\t>P{}.IRecv(shape={}, dtype={}, tag={}) outer left data from left neighbour process P{}."
msg = msg.format(
local_rank, shape, buf.dtype, recvtag, left_rank
)
gprint(msg)
if should_exchange_to_right:
# Exchanges with right neighour
assert (right_rank != local_rank) and (right_rank != -1)
# send inner right to right rank
sendtag = msg_tag(i, local_rank, right_rank, d, 1)
if src_data_on_device:
tmp = self.host_backend.empty(
shape=shape, dtype=base_dtype
)
send_buf = tmp.handle
mpi_type = base_mpi_type.Create_contiguous(
send_buf.size
)
mpi_type.Commit()
lp.i_src_buffers += ((tmp, buf, inner_right),)
else:
send_buf = buf.handle
mpi_type = TopoTools.create_subarray_from_buffer(
send_buf, inner_right
)
send_kwds = {
"buf": [send_buf, mpi_type],
"dest": right_rank,
"tag": sendtag,
}
lp.isend_kwds.append(send_kwds)
msg = "\t\t>P{}.ISend(shape={}, dtype={}, tag={}) inner right data to right neighbour process P{}."
msg = msg.format(
local_rank, shape, buf.dtype, sendtag, right_rank
)
gprint(msg)
# receive outer left from right rank
recvtag = msg_tag(i, right_rank, local_rank, d, 0)
if dst_data_on_device:
tmp = self.host_backend.empty(
shape=shape, dtype=base_dtype
)
recv_buf = tmp.handle
mpi_type = base_mpi_type.Create_contiguous(
recv_buf.size
)
mpi_type.Commit()
lp.i_dst_buffers += ((tmp, buf, outer_right),)
else:
recv_buf = buf.handle
mpi_type = TopoTools.create_subarray_from_buffer(
recv_buf, outer_right
)
recv_kwds = {
"buf": [recv_buf, mpi_type],
"source": right_rank,
"tag": recvtag,
}
lp.irecv_kwds.append(recv_kwds)
msg = "\t\t>P{}.IRecv(shape={}, dtype={}, tag={}) outer right data from right neighbour process P{}."
msg = msg.format(
local_rank, shape, buf.dtype, recvtag, right_rank
)
gprint(msg)
lp.from_buffer = src_data_on_device
lp.to_buffer = dst_data_on_device
else:
msg = f"Unknown MPI exchange method {exchange_method}."
raise NotImplementedError(msg)
elif ghost_op in (GhostOperation.ACCUMULATE,):
# FOR PERIODIC OR NONE BOUNDARIES:
# SEND DIRECTION IS
# OUTER GHOSTS ---> TMP BUFFER
# OPERATION IS
# INNER GHOSTS = OP(INNER_GHOSTS, OUTER_GHOSTS)
# ELSE OPERATION IS
# OUTER_GHOSTS[...] = 0
if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V:
# Send and receive every buffer at once to neighbours
# /!\ we send and receive all data components at once
left_rank = neighbour_ranks[0, d]
right_rank = neighbour_ranks[1, d]
if nprocs == 2:
# switch left and right in 2 proc periodic case
inner_right, inner_left = inner_left, inner_right
if should_exchange_to_left:
lp.v_send_requests.setdefault(
("left", left_rank), []
).append(buf[outer_left])
lp.v_recv_requests.setdefault(
("left", left_rank), []
).append(buf[inner_left])
if should_exchange_to_right:
lp.v_send_requests.setdefault(
("right", right_rank), []
).append(buf[outer_right])
lp.v_recv_requests.setdefault(
("right", right_rank), []
).append(buf[inner_right])
lp.from_buffer = True
lp.to_buffer = True
elif exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W:
# Send and receive to and from every neigbours
# /!\ only one buffer at a time
if nprocs == 2:
# switch left and right in 2 proc periodic case
inner_right, inner_left = inner_left, inner_right
left_rank = neighbour_ranks[0, d]
right_rank = neighbour_ranks[1, d]
if should_exchange_to_left:
lp.w_send_requests.setdefault(
("left", left_rank), []
).append((buf, outer_left))
lp.w_recv_requests.setdefault(
("left", left_rank), []
).append((buf, inner_left))
if should_exchange_to_right:
lp.w_send_requests.setdefault(
("right", right_rank), []
).append((buf, outer_right))
lp.w_recv_requests.setdefault(
("right", right_rank), []
).append((buf, inner_right))
lp.from_buffer = src_data_on_device
lp.to_buffer = True
elif exchange_method is ExchangeMethod.ISEND_IRECV:
if should_exchange_to_left:
# Exchanges with left neighour
left_rank = neighbour_ranks[0, d]
assert (left_rank != local_rank) and (left_rank != -1)
# send outer left to left rank
sendtag = msg_tag(i, local_rank, left_rank, d, 0)
if src_data_on_device:
tmp = self.host_backend.empty(
shape=shape, dtype=base_dtype
)
send_buf = tmp.handle
mpi_type = base_mpi_type.Create_contiguous(
send_buf.size
)
mpi_type.Commit()
lp.i_src_buffers += ((tmp, buf, outer_left),)
else:
send_buf = buf.handle
mpi_type = TopoTools.create_subarray_from_buffer(
send_buf, outer_left
)
assert all(send_buf[outer_left].shape == shape)
send_kwds = {
"buf": [send_buf, mpi_type],
"dest": left_rank,
"tag": sendtag,
}
lp.isend_kwds.append(send_kwds)
msg = "\t\t>P{}.ISend(shape={}, dtype={}, tag={}) outer left data to left neighbour process P{}."
msg = msg.format(
local_rank, shape, send_buf.dtype, sendtag, left_rank
)
gprint(msg)
# receive outer right ghosts data from left rank in a tmp buffer
recvtag = msg_tag(i, left_rank, local_rank, d, 1)
mpi_type = base_mpi_type.Create_contiguous(
buf[inner_left].size
)
mpi_type.Commit()
tmp = self.host_backend.empty_like(
send_buf, shape=shape, dtype=base_dtype
)
recv_kwds = {
"buf": [tmp.handle, mpi_type],
"source": left_rank,
"tag": recvtag,
}
lp.irecv_kwds.append(recv_kwds)
lp.i_dst_buffers.append((tmp, buf, inner_left))
msg = "\t\t>P{}.IRecv(shape={}, dtype={}, tag={}) inner left data from left neighbour process P{}."
msg = msg.format(
local_rank, shape, tmp.dtype, recvtag, left_rank
)
gprint(msg)
if should_exchange_to_right:
# Exchanges with right neighour
right_rank = neighbour_ranks[1, d]
assert (right_rank != local_rank) and (right_rank != -1)
# send outer right to right rank
sendtag = msg_tag(i, local_rank, right_rank, d, 1)
if src_data_on_device:
tmp = self.host_backend.empty(
shape=shape, dtype=base_dtype
)
send_buf = tmp.handle
mpi_type = base_mpi_type.Create_contiguous(
send_buf.size
)
mpi_type.Commit()
lp.i_src_buffers += ((tmp, buf, outer_right),)
else:
send_buf = buf.handle
mpi_type = TopoTools.create_subarray_from_buffer(
send_buf, outer_right
)
assert all(send_buf[outer_right].shape == shape)
send_kwds = {
"buf": [send_buf, mpi_type],
"dest": right_rank,
"tag": sendtag,
}
lp.isend_kwds.append(send_kwds)
msg = "\t\t>P{}.ISend(shape={}, dtype={}, tag={}) outer right data to right neighbour process P{}."
msg = msg.format(
local_rank, shape, send_buf.dtype, sendtag, right_rank
)
gprint(msg)
# receive outer left ghosts data from right rank in a tmp buffer
recvtag = msg_tag(i, right_rank, local_rank, d, 0)
mpi_type = base_mpi_type.Create_contiguous(
buf[inner_right].size
)
mpi_type.Commit()
tmp = self.host_backend.empty(shape=shape, dtype=base_dtype)
recv_kwds = {
"buf": [tmp.handle, mpi_type],
"source": right_rank,
"tag": recvtag,
}
lp.irecv_kwds.append(recv_kwds)
lp.i_dst_buffers.append((tmp, buf, inner_right))
msg = "\t\t>P{}.IRecv(shape={}, dtype={}, tag={}) inner right data from right neighbour process P{}."
msg = msg.format(
local_rank, shape, tmp.dtype, recvtag, right_rank
)
gprint(msg)
lp.from_buffer = src_data_on_device
lp.to_buffer = True
else:
msg = f"Unknown MPI exchange method {exchange_method}."
raise NotImplementedError(msg)
else:
msg = f"Unknown GhostOperation {ghost_op}."
raise NotImplementedError(msg)
msg = "Something went wrong while initializing LauncherParameters::{}::{}, got value {{}}."
msg = msg.format(ghost_op, exchange_method)
assert lp.from_buffer is not None, msg.format(lp.from_buffer)
assert lp.to_buffer is not None, msg.format(lp.to_buffer)
# handle all_to_all_w kwds (one call for all neighbours per buffer)
if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W:
sendtypes, recvtypes = (), () # subarray types
sendcounts, recvcounts = (), () # in type counts
sdispls, rdispls = (), () # in bytes
sdispl, rdispl = 0, 0
scount, rcount = 0, 0
src_buffers, dst_buffers = (), ()
for direction in range(comm.Get_dim()):
for tag, rank in zip(("left", "right"), comm.Shift(direction, 1)):
displ = 0
if (tag, rank) in lp.w_send_requests:
requests = lp.w_send_requests[(tag, rank)]
assert len(requests) == 1
(src, slc) = requests[0]
assert src is buf
assert src.dtype == base_dtype
if lp.from_buffer:
mpi_type = base_mpi_type
send_count = src[slc].size
displ = src[slc].nbytes
vslc = slice(scount, scount + send_count)
src_buffers += ((src[slc], vslc),)
else:
mpi_type = TopoTools.create_subarray_from_buffer(
src, slc
)
send_count = 1
else:
mpi_type = base_mpi_type
send_count = 0
sendtypes += (mpi_type,)
sendcounts += (send_count,)
sdispls += (sdispl,)
sdispl += displ
scount += send_count
displ = 0
if (tag, rank) in lp.w_recv_requests:
requests = lp.w_recv_requests[(tag, rank)]
assert len(requests) == 1
(dst, slc) = requests[0]
assert dst is buf
assert dst.dtype == base_dtype
if lp.to_buffer:
mpi_type = base_mpi_type
recv_count = dst[slc].size
displ = dst[slc].nbytes
vslc = slice(rcount, rcount + recv_count)
dst_buffers += ((dst[slc], vslc),)
else:
mpi_type = TopoTools.create_subarray_from_buffer(
dst, slc
)
recv_count = 1
else:
mpi_type = base_mpi_type
recv_count = 0
recvtypes += (mpi_type,)
recvcounts += (recv_count,)
rdispls += (rdispl,)
rdispl += displ
rcount += recv_count
if lp.from_buffer:
assert scount > 0, scount
send_size = scount
send_buffer = self.host_backend.empty(
shape=(send_size,), dtype=base_dtype
)
sendbuf = send_buffer.handle
else:
send_buffer = None
sendbuf = buf.handle
if lp.to_buffer:
assert rcount > 0, rcount
recv_size = rcount
recv_buffer = self.host_backend.empty(
shape=(recv_size,), dtype=base_dtype
)
recvbuf = recv_buffer.handle
else:
recv_buffer = None
recvbuf = buf.handle
lp.w_src_buffers.append(src_buffers)
lp.w_send_buffer.append(send_buffer)
lp.w_dst_buffers.append(dst_buffers)
lp.w_recv_buffer.append(recv_buffer)
lp.w_kwds.append(
{
"sendbuf": [sendbuf, sendcounts, sdispls, sendtypes],
"recvbuf": [recvbuf, recvcounts, rdispls, recvtypes],
}
)
# handle all_to_all_v kwds (one call for all buffers and all neighbours)
if lp.has_mpi_exchanges and (
exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V
):
assert lp.v_send_requests
assert lp.v_recv_requests
sendtype, recvtype = base_mpi_type, base_mpi_type
sendcounts, recvcounts = (), () # in type counts
sdispls, rdispls = (), () # in type counts
send_disp, recv_disp = 0, 0 # in type counts
src_buffers, dst_buffers = (), ()
send_buffer, recv_buffer = None, None
for direction in range(comm.Get_dim()):
for tag, rank in zip(("left", "right"), comm.Shift(direction, 1)):
send_count, recv_count = 0, 0
if (tag, rank) in lp.v_send_requests:
for src in lp.v_send_requests[(tag, rank)]:
assert src.dtype == base_dtype
slc = slice(
send_disp + send_count,
send_disp + send_count + src.size,
)
src_buffers += ((src, (slc,)),)
send_count += src.size
sendcounts += (send_count,)
sdispls += (send_disp,)
send_disp += send_count
if (tag, rank) in lp.v_recv_requests:
for dst in lp.v_recv_requests[(tag, rank)]:
assert dst.dtype == base_dtype
slc = slice(
recv_disp + recv_count,
recv_disp + recv_count + dst.size,
)
dst_buffers += ((dst, (slc,)),)
recv_count += dst.size
recvcounts += (recv_count,)
rdispls += (recv_disp,)
recv_disp += recv_count
assert send_disp == sum(sendcounts) > 0, send_disp
send_size = send_disp
send_buffer = self.host_backend.empty(shape=(send_size,), dtype=base_dtype)
assert recv_disp == sum(recvcounts) > 0, recv_disp
recv_size = recv_disp
recv_buffer = self.host_backend.empty(shape=(recv_size,), dtype=base_dtype)
lp.v_kwds = {
"sendbuf": [send_buffer.handle, sendcounts, sdispls, sendtype],
"recvbuf": [recv_buffer.handle, recvcounts, rdispls, recvtype],
}
lp.v_src_buffers = src_buffers
lp.v_dst_buffers = dst_buffers
lp.v_send_buffer = send_buffer
lp.v_recv_buffer = recv_buffer
return lp.setup()
def _build_python_launcher(self):
assert self._launcher is None
ghost_op = self.ghost_op
exchange_method = self.exchange_method
comm = self.topology.comm
base_dtype = self.base_dtype
lp = self._prepare_launcher()
if ghost_op is GhostOperation.EXCHANGE:
# Warning: in EXCHANGE operation:
# cannot overlap comm with local exchanges (because of edges and corners)
# mpi buffers are the only buffer where all exchanges take place
def local_exchanges(lp=lp):
for fn, buf, slices, _, _, _, _ in lp.local_symmetries:
gprint(" local_symmetry")
fn(buf[slices])
for buf, outer, inner, shape, direction in lp.local_exchanges:
gprint(" local_exchange")
buf[outer] = buf[inner]
for buf, slc, shape, val in lp.diagonal_ghosts:
gprint(" diagonal ghosts")
buf[slc] = val
if lp.has_mpi_exchanges:
if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W:
assert len(lp.w_kwds) > 0
assert lp.from_buffer is False
assert lp.to_buffer is False
def python_launcher(lp=lp):
evts = []
for kwds in lp.w_kwds:
evt = comm.Ineighbor_alltoallw(**kwds)
evts.append(evt)
MPI.Request.Waitall(evts)
local_exchanges() # perform local exchanges after comm end
elif exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V:
assert lp.v_kwds
assert lp.from_buffer is True, lp.from_buffer
assert lp.to_buffer is True, lp.to_buffer
def python_launcher(lp=lp):
for src, slc in lp.v_src_buffers:
lp.v_send_buffer[slc] = src.ravel()
evt = comm.Ineighbor_alltoallv(**lp.v_kwds)
evt.wait()
for dst, slc in lp.v_dst_buffers:
dst[...] = lp.v_recv_buffer[slc].reshape(dst.shape)
local_exchanges() # perform local exchanges after comm end
elif exchange_method is ExchangeMethod.ISEND_IRECV:
assert lp.irecv_kwds
assert lp.isend_kwds
assert lp.from_buffer is False
assert lp.to_buffer is False
def python_launcher(lp=lp):
evts = []
for recv_kwds in lp.irecv_kwds:
gprint(
" Irecv(source={}, tag={})".format(
recv_kwds["source"], recv_kwds["tag"]
)
)
evt = comm.Irecv(**recv_kwds)
evts.append(evt)
for send_kwds in lp.isend_kwds:
gprint(
" Isend(dest={}, tag={})".format(
send_kwds["dest"], send_kwds["tag"]
)
)
evt = comm.Isend(**send_kwds)
evts.append(evt)
MPI.Request.Waitall(evts)
local_exchanges() # perform local exchanges after comm end
else:
msg = f"Unknown MPI exchange method {exchange_method}."
raise NotImplementedError(msg)
else:
python_launcher = local_exchanges
elif ghost_op is GhostOperation.ACCUMULATE:
# Warning: in ACCUMULATE operations:
# can overlap mpi comm with local exchanges because mpi uses distinct buffers
def local_exchanges(lp=lp):
for buf, outer, inner, shape, direction in lp.local_exchanges:
buf[inner] += buf[outer]
if lp.has_mpi_exchanges:
if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W:
assert len(lp.w_kwds) > 0
assert lp.from_buffer is False
assert lp.to_buffer is True
def python_launcher(lp=lp):
evts = []
for kwds, send_buffer, src_buffers in zip(
lp.w_kwds, lp.w_send_buffer, lp.w_src_buffers
):
for src, slc in src_buffers:
send_buffer[slc] = src.ravel()
evt = comm.Ineighbor_alltoallw(**kwds)
evts.append(evt)
local_exchanges()
for idx in iter_mpi_requests(evts):
for dst, slc in lp.w_dst_buffers[idx]:
dst[...] += lp.w_recv_buffer[idx][slc].reshape(
dst.shape
)
elif exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V:
assert lp.v_kwds
assert lp.from_buffer is True
assert lp.to_buffer is True
def python_launcher(lp=lp):
for src, slc in lp.v_src_buffers:
lp.v_send_buffer[slc] = src.ravel()
evt = comm.Ineighbor_alltoallv(**lp.v_kwds)
local_exchanges()
evt.wait()
for dst, slc in lp.v_dst_buffers:
dst[...] += lp.v_recv_buffer[slc].reshape(dst.shape)
elif exchange_method is ExchangeMethod.ISEND_IRECV:
assert lp.from_buffer is False
assert lp.to_buffer is True
assert len(lp.irecv_kwds) == len(lp.isend_kwds) > 0
def python_launcher(lp=lp):
send_evts = []
recv_evts = []
for recv_kwds in lp.irecv_kwds:
gprint(
" Irecv(source={}, tag={})".format(
recv_kwds["source"], recv_kwds["tag"]
)
)
gprint_buffer(
" >receiving", recv_kwds["buf"], no_data=True
)
evt = comm.Irecv(**recv_kwds)
recv_evts.append(evt)
for send_kwds in lp.isend_kwds:
gprint(
" Isend(dest={}, tag={})".format(
send_kwds["dest"], send_kwds["tag"]
)
)
gprint_buffer(" >sending", send_kwds["buf"])
evt = comm.Isend(**send_kwds)
send_evts.append(evt)
local_exchanges()
for idx in iter_mpi_requests(recv_evts):
(tmp, buf, inner) = lp.i_dst_buffers[idx]
gprint_buffer(" Received", tmp)
gprint_buffer(" Summing to", buf[inner])
buf[inner] += tmp
gprint_buffer(" Result is", buf[inner])
MPI.Request.Waitall(send_evts)
else:
msg = f"Unknown MPI exchange method {exchange_method}."
raise NotImplementedError(msg)
else:
python_launcher = local_exchanges
else:
msg = f"Unknown GhostOperation {ghost_op}."
raise NotImplementedError(msg)
self._launcher = python_launcher
def _build_opencl_launcher(self):
assert self._launcher is None
from hysop.backend.device.opencl.opencl_kernel_launcher import (
OpenClKernelListLauncher,
HostLauncherI,
)
from hysop.backend.device.opencl.opencl_copy_kernel_launchers import (
OpenClCopyBufferRectLauncher,
)
class FunctionLauncher(HostLauncherI):
def __init__(self, name, fn):
super().__init__(name=name)
self._fn = fn
def __call__(self, *args, **kwds):
super().__call__()
return self._fn(*args, **kwds)
class MPIGhostExchangeLauncher(FunctionLauncher):
def __init__(self, fn):
super().__init__(name="MPI_Ghost_Exchange_Launcher", fn=fn)
lp = self._prepare_launcher()
dim = self.dim
ghost_op = self.ghost_op
exchange_method = self.exchange_method
comm = self.topology.comm
base_dtype = self.base_dtype
# generate the minimal number of temporary backend buffers
tmp_buffers = {}
def mk_tmp(shape, dtype, color=0, host=False):
nbytes = np.prod(shape, dtype=np.int64) * dtype.itemsize
key = (host, nbytes, color)
if key in tmp_buffers:
tmp = tmp_buffers[key].reshape(shape).view(dtype=dtype)
elif host:
tmp = self.host_backend.empty(shape=shape, dtype=dtype)
tmp_buffers[key] = tmp
else:
tmp = self.backend.empty(shape=shape, dtype=dtype)
tmp_buffers[key] = tmp
return tmp
if ghost_op is GhostOperation.EXCHANGE:
name = f"local_ghosts_exchanges_{self.name}"
local_kl = OpenClKernelListLauncher(name=name)
for fn, buf, slices, shape, direction, to_left, bc in lp.local_symmetries:
# SYMMETRIC OR ANTISYMMETRIC LOCAL EXCHANGE
dirlabel = DirectionLabels[dim - direction - 1]
vname = "{}_{}_{}_{}".format(
self.name,
dirlabel,
"left" if to_left else "right",
str(bc.bc).lower(),
)
tmp = mk_tmp(shape=shape, dtype=base_dtype, host=True)
k0 = OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_boundary_layer",
src=buf,
src_slices=slices,
dst=tmp,
)
k1 = OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_boundary_layer",
src=tmp,
dst=buf,
dst_slices=slices,
)
def apply_fn(f=fn, X=tmp, k0=k0, k1=k1, **kwds):
evt = k0(**kwds)
evt.wait()
f(X)
evt = k1(**kwds)
return evt
local_kl += FunctionLauncher(vname + "_apply_on_host", apply_fn)
for i, (buf, outer_slc, inner_slc, shape, direction) in enumerate(
lp.local_exchanges
):
# PERIODIC LOCAL EXCHANGE
dirlabel = DirectionLabels[dim - direction - 1]
vname = f"{self.name}_{i}_{dirlabel}"
# some opencl platforms reject inplace buffer copies
# so we use a tmp buffer to perform local ghost exchanges
tmp = mk_tmp(shape=shape, dtype=base_dtype)
# exchange all left inner ghosts to right outer ghosts
local_kl += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_inner_to_tmp_ghost_buffer",
src=buf,
src_slices=inner_slc,
dst=tmp,
)
local_kl += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_tmp_ghost_buffer_to_outer",
src=tmp,
dst=buf,
dst_slices=outer_slc,
)
for i, (buf, slc, shape, value) in enumerate(lp.diagonal_ghosts):
vname = f"{self.name}_diagonal_{i}"
tmp = self.backend.empty(shape=shape, dtype=base_dtype)
tmp.fill(value)
local_kl += OpenClCopyBufferRectLauncher.from_slices(
varname=vname, src=tmp, dst=buf, dst_slices=slc
)
kl = OpenClKernelListLauncher(name=f"exchange_ghosts_{self.name}")
if lp.has_mpi_exchanges:
assert lp.from_buffer is True
assert lp.to_buffer is True
if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W:
assert len(lp.w_kwds) > 0
d2h_kls = []
h2d_kls = []
for i, (send_buffer, src_buffers) in enumerate(
zip(lp.w_send_buffer, lp.w_src_buffers)
):
d2h_kl_i = OpenClKernelListLauncher(
f"inner_ghosts_D2H_exchanges_{self.name}_{i}"
)
for j, (src, dst_slc) in enumerate(src_buffers):
vname = f"{self.name}_{i}_{j}"
dst = send_buffer[dst_slc].reshape(src.shape)
d2h_kl_i += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_inner_ghosts_D2H",
src=src,
src_slices=None,
dst=dst,
dst_slices=None,
)
d2h_kls.append(d2h_kl_i)
for i, (recv_buffer, dst_buffers) in enumerate(
zip(lp.w_recv_buffer, lp.w_dst_buffers)
):
h2d_kl_i = OpenClKernelListLauncher(
f"outer_ghosts_H2D_exchanges_{self.name}_{i}"
)
for j, (dst, src_slc) in enumerate(dst_buffers):
src = recv_buffer[src_slc].reshape(dst.shape)
h2d_kl_i += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_outer_ghosts_H2D",
src=src,
src_slices=None,
dst=dst,
dst_slices=None,
)
h2d_kls.append(h2d_kl_i)
def opencl_launcher(**kwds):
d2h_evts, mpi_evts = [], []
for kl in d2h_kls:
evt = kl(**kwds)
d2h_evts.append(evt)
local_kl(**kwds)
for evt, w_kwds in zip(d2h_evts, lp.w_kwds):
evt.wait()
mpi_evt = comm.Ineighbor_alltoallw(**w_kwds)
mpi_evts.append(mpi_evt)
for idx in iter_mpi_requests(mpi_evts):
evt = h2d_kls[idx](**kwds)
return evt
kl += MPIGhostExchangeLauncher(opencl_launcher)
elif exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V:
assert lp.v_kwds
d2h_kl = OpenClKernelListLauncher(
f"inner_ghosts_D2H_exchanges_{self.name}"
)
h2d_kl = OpenClKernelListLauncher(
f"outer_ghosts_H2D_exchanges_{self.name}"
)
for i, (src, dst_slc) in enumerate(lp.v_src_buffers):
vname = f"{self.name}_{i}"
dst = lp.v_send_buffer[dst_slc].reshape(src.shape)
d2h_kl += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_inner_ghosts_D2H",
src=src,
src_slices=None,
dst=dst,
dst_slices=None,
)
for i, (dst, src_slc) in enumerate(lp.v_dst_buffers):
vname = f"{self.name}_{i}"
src = lp.v_recv_buffer[src_slc].reshape(dst.shape)
h2d_kl += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_outer_ghosts_H2D",
src=src,
src_slices=None,
dst=dst,
dst_slices=None,
)
def opencl_launcher(**kwds):
evt = d2h_kl(**kwds)
evt.wait()
mpi_evt = comm.Ineighbor_alltoallv(**lp.v_kwds)
local_kl(**kwds)
mpi_evt.wait()
evt = h2d_kl(**kwds)
return evt
kl += MPIGhostExchangeLauncher(opencl_launcher)
elif exchange_method is ExchangeMethod.ISEND_IRECV:
assert len(lp.i_src_buffers) == len(lp.i_dst_buffers) > 0
assert len(lp.isend_kwds) == len(lp.irecv_kwds) > 0
assert len(lp.isend_kwds) == len(lp.i_src_buffers)
d2h_kls = []
h2d_kls = []
for i, (dst, src, src_slc) in enumerate(lp.i_src_buffers):
vname = f"{self.name}_{i}"
d2h_kl = OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_inner_ghosts_D2H",
src=src,
src_slices=src_slc,
dst=dst,
dst_slices=None,
)
d2h_kls.append(d2h_kl)
for i, (src, dst, dst_slc) in enumerate(lp.i_dst_buffers):
vname = f"{self.name}_{i}"
h2d_kl = OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_outer_ghosts_H2D",
src=src,
src_slices=None,
dst=dst,
dst_slices=dst_slc,
)
h2d_kls.append(h2d_kl)
def opencl_launcher(**kwds):
d2h_evts, mpi_send_evts, mpi_recv_evts = [], [], []
for kl in d2h_kls:
evt = kl(**kwds)
d2h_evts.append(evt)
local_kl(**kwds)
for recv_kwds in lp.irecv_kwds:
mpi_evt = comm.Irecv(**recv_kwds)
mpi_recv_evts.append(mpi_evt)
for evt, send_kwds in zip(d2h_evts, lp.isend_kwds):
evt.wait()
mpi_evt = comm.Isend(**send_kwds)
mpi_send_evts.append(mpi_evt)
for idx in iter_mpi_requests(mpi_recv_evts):
evt = h2d_kls[idx](**kwds)
MPI.Request.Waitall(mpi_send_evts)
return evt
kl += MPIGhostExchangeLauncher(opencl_launcher)
else:
msg = f"Unknown MPI exchange method {exchange_method}."
raise NotImplementedError(msg)
else:
kl += local_kl
elif ghost_op is GhostOperation.ACCUMULATE:
name = f"local_ghosts_accumulate_{self.name}"
local_kl = OpenClKernelListLauncher(name=name)
for i, (buf, outer_slc, inner_slc, shape, direction) in enumerate(
lp.local_exchanges
):
dirlabel = DirectionLabels[dim - direction - 1]
vname = f"{self.name}_{i}_{dirlabel}"
ltmp = mk_tmp(shape=shape, dtype=base_dtype, color=0)
rtmp = mk_tmp(shape=shape, dtype=base_dtype, color=1)
# accumulate outer ghosts to inner ghosts
local_kl += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_inner_to_tmp_ghost_buffer_0",
src=buf,
src_slices=inner_slc,
dst=ltmp,
)
local_kl += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_outer_to_tmp_ghost_buffer_1",
src=buf,
src_slices=outer_slc,
dst=rtmp,
)
local_kl += self.backend.add(
x1=ltmp, x2=rtmp, out=ltmp, build_kernel_launcher=True
)
local_kl += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_accumulation_to_inner",
src=ltmp,
dst=buf,
dst_slices=inner_slc,
)
kl = OpenClKernelListLauncher(name=f"accumulate_ghosts_{self.name}")
if lp.has_mpi_exchanges:
if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W:
assert len(lp.w_kwds) > 0
d2h_kls = []
h2d_kls = []
for i, (send_buffer, src_buffers) in enumerate(
zip(lp.w_send_buffer, lp.w_src_buffers)
):
d2h_kl_i = OpenClKernelListLauncher(
f"outer_ghosts_D2H_exchanges_{self.name}_{i}"
)
for j, (src, dst_slc) in enumerate(src_buffers):
vname = f"{self.name}_{i}_{j}"
dst = send_buffer[dst_slc].reshape(src.shape)
d2h_kl_i += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_inner_ghosts_D2H",
src=src,
src_slices=None,
dst=dst,
dst_slices=None,
)
d2h_kls.append(d2h_kl_i)
for i, (recv_buffer, dst_buffers) in enumerate(
zip(lp.w_recv_buffer, lp.w_dst_buffers)
):
h2d_kl_i = OpenClKernelListLauncher(
f"inner_ghosts_H2D_exchanges_{self.name}_{i}"
)
for j, (dst, src_slc) in enumerate(dst_buffers):
vname = f"{self.name}_{i}_{j}"
shape = dst.shape
outer = recv_buffer[src_slc].reshape(shape)
inner = dst
ltmp = mk_tmp(shape=shape, dtype=base_dtype, color=0)
rtmp = mk_tmp(shape=shape, dtype=base_dtype, color=1)
h2d_kl_i += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_outer_to_tmp_ghost_buffer_0",
src=outer,
src_slices=None,
dst=ltmp,
dst_slices=None,
)
h2d_kl_i += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_inner_to_tmp_ghost_buffer_1",
src=inner,
src_slices=None,
dst=rtmp,
dst_slices=None,
)
h2d_kl_i += self.backend.add(
x1=ltmp, x2=rtmp, out=ltmp, build_kernel_launcher=True
)
h2d_kl_i += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_accumulation_to_inner",
src=ltmp,
dst=inner,
)
h2d_kls.append(h2d_kl_i)
def opencl_launcher(**kwds):
d2h_evts, mpi_evts = [], []
for kl in d2h_kls:
evt = kl(**kwds)
d2h_evts.append(evt)
local_kl(**kwds)
for evt, w_kwds in zip(d2h_evts, lp.w_kwds):
evt.wait()
mpi_evt = comm.Ineighbor_alltoallw(**w_kwds)
mpi_evts.append(mpi_evt)
for idx in iter_mpi_requests(mpi_evts):
evt = h2d_kls[idx](**kwds)
return evt
kl += MPIGhostExchangeLauncher(opencl_launcher)
elif exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V:
assert lp.v_kwds
assert lp.from_buffer is True
assert lp.to_buffer is True
d2h_kl = OpenClKernelListLauncher(
f"inner_ghosts_D2H_exchanges_{self.name}"
)
h2d_kl = OpenClKernelListLauncher(
f"outer_ghosts_H2D_exchanges_{self.name}"
)
for i, (src, dst_slc) in enumerate(lp.v_src_buffers):
vname = f"{self.name}_{i}"
dst = lp.v_send_buffer[dst_slc].reshape(src.shape)
d2h_kl += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_inner_ghosts_D2H",
src=src,
src_slices=None,
dst=dst,
dst_slices=None,
)
for i, (dst, src_slc) in enumerate(lp.v_dst_buffers):
vname = f"{self.name}_{i}"
shape = dst.shape
outer = lp.v_recv_buffer[src_slc].reshape(shape)
inner = dst
ltmp = mk_tmp(shape=shape, dtype=base_dtype, color=0)
rtmp = mk_tmp(shape=shape, dtype=base_dtype, color=1)
h2d_kl += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_outer_to_tmp_ghost_buffer_0",
src=outer,
src_slices=None,
dst=ltmp,
dst_slices=None,
)
h2d_kl += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_inner_to_tmp_ghost_buffer_1",
src=inner,
src_slices=None,
dst=rtmp,
dst_slices=None,
)
h2d_kl += self.backend.add(
x1=ltmp, x2=rtmp, out=ltmp, build_kernel_launcher=True
)
h2d_kl += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_accumulation_to_inner",
src=ltmp,
dst=inner,
)
def opencl_launcher(**kwds):
evt = d2h_kl(**kwds)
evt.wait()
mpi_evt = comm.Ineighbor_alltoallv(**lp.v_kwds)
local_kl(**kwds)
mpi_evt.wait()
evt = h2d_kl(**kwds)
return evt
kl += MPIGhostExchangeLauncher(opencl_launcher)
elif exchange_method is ExchangeMethod.ISEND_IRECV:
assert len(lp.i_src_buffers) == len(lp.i_dst_buffers) > 0
assert len(lp.isend_kwds) == len(lp.irecv_kwds) > 0
assert len(lp.isend_kwds) == len(lp.i_src_buffers)
d2h_kls = []
h2d_kls = []
for i, (dst, src, src_slc) in enumerate(lp.i_src_buffers):
vname = f"{self.name}_{i}"
d2h_kl = OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_outer_ghosts_D2H",
src=src,
src_slices=src_slc,
dst=dst,
dst_slices=None,
)
d2h_kls.append(d2h_kl)
for i, (src, dst, dst_slc) in enumerate(lp.i_dst_buffers):
shape = src.shape
outer = src
inner = dst[dst_slc]
ltmp = mk_tmp(shape=shape, dtype=base_dtype, color=0)
rtmp = mk_tmp(shape=shape, dtype=base_dtype, color=1)
vname = f"{self.name}_{i}"
h2d_kl = OpenClKernelListLauncher(
f"outer_ghosts_H2D_exchanges_{self.name}_{i}"
)
h2d_kl += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_outer_to_tmp_ghost_buffer_0",
src=outer,
src_slices=None,
dst=ltmp,
dst_slices=None,
)
h2d_kl += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_inner_to_tmp_ghost_buffer_1",
src=inner,
src_slices=None,
dst=rtmp,
dst_slices=None,
)
h2d_kl += self.backend.add(
x1=ltmp, x2=rtmp, out=ltmp, build_kernel_launcher=True
)
h2d_kl += OpenClCopyBufferRectLauncher.from_slices(
varname=vname + "_accumulation_to_inner",
src=ltmp,
dst=inner,
)
h2d_kls.append(h2d_kl)
def opencl_launcher(**kwds):
d2h_evts, mpi_send_evts, mpi_recv_evts = [], [], []
for kl in d2h_kls:
evt = kl(**kwds)
d2h_evts.append(evt)
local_kl(**kwds)
for recv_kwds in lp.irecv_kwds:
mpi_evt = comm.Irecv(**recv_kwds)
mpi_recv_evts.append(mpi_evt)
for evt, send_kwds in zip(d2h_evts, lp.isend_kwds):
evt.wait()
mpi_evt = comm.Isend(**send_kwds)
mpi_send_evts.append(mpi_evt)
for idx in iter_mpi_requests(mpi_recv_evts):
evt = h2d_kls[idx](**kwds)
MPI.Request.Waitall(mpi_send_evts)
return evt
kl += MPIGhostExchangeLauncher(opencl_launcher)
else:
msg = f"Unknown MPI exchange method {exchange_method}."
raise NotImplementedError(msg)
else:
kl += local_kl
self._launcher = kl